-
Notifications
You must be signed in to change notification settings - Fork 14.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AIP-72: Handling up_for_retry
task instance states
#45070
Conversation
def _is_eligible_to_retry(task_instance, task_retries: int): | ||
""" | ||
Is task instance is eligible for retry. | ||
|
||
:param task_instance: the task instance | ||
|
||
:meta private: | ||
""" | ||
if task_instance.state == State.RESTARTING: | ||
# If a task is RESTARTING state it is always eligible for retry | ||
return True | ||
|
||
return task_retries and task_instance.try_number <= task_instance.max_tries |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mainly a port over of
airflow/airflow/models/taskinstance.py
Lines 1053 to 1072 in 088242a
def _is_eligible_to_retry(*, task_instance: TaskInstance): | |
""" | |
Is task instance is eligible for retry. | |
:param task_instance: the task instance | |
:meta private: | |
""" | |
if task_instance.state == TaskInstanceState.RESTARTING: | |
# If a task is cleared when running, it goes into RESTARTING state and is always | |
# eligible for retry | |
return True | |
if not getattr(task_instance, "task", None): | |
# Couldn't load the task, don't know number of retries, guess: | |
return task_instance.try_number <= task_instance.max_tries | |
if TYPE_CHECKING: | |
assert task_instance.task | |
return task_instance.task.retries and task_instance.try_number <= task_instance.max_tries |
Tried splitting it too because we do not have "task_instance.task" here
except (AirflowTaskTimeout, AirflowException): | ||
# Couldn't load the task, don't know number of retries, guess | ||
if not getattr(ti, "task", None): | ||
# Let us set the task_retries to default = 0 | ||
msg = RetryTask( | ||
end_date=datetime.now(tz=timezone.utc), | ||
task_retries=0, | ||
) | ||
else: | ||
msg = RetryTask( | ||
end_date=datetime.now(tz=timezone.utc), | ||
# is `or 0` needed? | ||
task_retries=ti.task.retries or 0, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As for the API and this PR, all we should care about is whether we should retry or not. The task ran, complained that it needs to retry, so we send a retry API call. The core logic of how retry works should be out of the scope of this PR.
We do not need this PR anymore, handled using #45106 |
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in newsfragments.